
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;

public class DwdTempDevice {
    public static void main(String[] args) {
        //TODO 1.创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(3);

        //TODO 3.创建数据源Source
        String connector = "mysql-cdc";
        String hostname = "heukrftr7x7cmlu6no4g.rwlb.rds.aliyuncs.com";
        String port = "3306";
        String username = "zhhuang4";
        String password = "HZfR1cqrRcZC^";
        String databaseName = "gfs_tms_device";
        String tableName = "gtd_device_temp_data_.*";

        String createSql = "CREATE TABLE IF NOT EXISTS gtd_device_temp_data (\n" +
                "    `id` BIGINT NOT NULL,\n" +
                "    `device_code` STRING COMMENT '设备编号',\n" +
                "    `device_name` STRING COMMENT '设备名称',\n" +
                "    `temp_time` TIMESTAMP(3) COMMENT '温度时间',\n" +
                "`window_time`  AS TO_TIMESTAMP(DATE_FORMAT(temp_time, 'yyyy-MM-dd HH:mm:ss')),\n" +
                "    `temp1` DECIMAL(5,2) COMMENT '温度1',\n" +
                "    `temp2` DECIMAL(5,2) COMMENT '温度2',\n" +
                "    `temp3` DECIMAL(5,2) COMMENT '温度3',\n" +
                "    `temp4` DECIMAL(5,2) COMMENT '温度4',\n" +
                "    `create_time` TIMESTAMP COMMENT '创建时间',\n" +
                "    `table_name`  String METADATA FROM 'table_name' VIRTUAL,\n" +
                "     WATERMARK FOR window_time AS window_time - INTERVAL '30' SECOND ,\n" +
                "    PRIMARY KEY ( `id` ) NOT ENFORCED\n" +
                ")";
        String withSql =
                "WITH (\n" +
                        "'connector' = '" + connector + "',\n" +
                        "'hostname' = '" + hostname + "',\n" +
                        "'port' = '" + port + "',\n" +
                        "'username' = '" + username + "',\n" +
                        "'password' = '" + password + "',\n" +
                        "'database-name' = '" + databaseName + "',\n" +
                        "'table-name' = '" + tableName + "',\n" +
                        "'debezium.snapshot.mode' = 'initial' " +
                        ");";

        tableEnv.executeSql(createSql + withSql);
//        tableEnv.executeSql("select * from gtd_device_temp_data").print();
        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS ods_gtd_device_temp_data (\n" +
                "    `device_code` STRING COMMENT '设备编号',\n" +
                "    `create_day` DATE COMMENT '最大创建日期',\n" +
                "    `device_name` STRING COMMENT '设备名称',\n" +
                "    `temp1` DECIMAL(5,2) COMMENT '温度1',\n" +
                "    `temp2` DECIMAL(5,2) COMMENT '温度2',\n" +
                "    `temp3` DECIMAL(5,2) COMMENT '温度3',\n" +
                "    `temp4` DECIMAL(5,2) COMMENT '温度4',\n" +
                "    `window_start` TIMESTAMP COMMENT '温度开始时间',\n" +
                "    `window_end` TIMESTAMP COMMENT '温度结束时间',\n" +
                "    PRIMARY KEY ( `device_code`,`create_day` ) NOT ENFORCED\n" +
                ") WITH (\n" +
                "'connector' = 'doris',\n" +
                "  'fenodes' = '172.19.2.250:8030',\n" +
                "  'table.identifier' = 'test_dw.ods_gtd_device_temp_data',\n" +
                "  'username' = 'zhhuang4',\n" +
                "\t'sink.properties.format' = 'json',\n" +
                "\t'sink.enable-2pc' = 'false',\n" +
                "\t'sink.properties.read_json_by_line' = 'true',\n" +
                "\t'sink.buffer-size' = '6',\n" +
                "\t'sink.buffer-count' = '4' ,\n" +
                "  'password' = 'HZfR1cqrRcZC^')");
        Table table = tableEnv.sqlQuery
                ("SELECT\t`device_code`\n" +
                " \t,CAST(MAX(`create_time`) AS DATE) AS create_day\n" +
                "\t,`device_name`\n" +
//                "\t,table_name\n" +
                " \t,AVG(`temp1`) AS avg_temp1\n" +
                " \t,AVG(`temp2`) AS avg_temp2\n" +
                " \t,AVG(`temp3`) AS avg_temp2\n" +
                " \t,AVG(`temp4`) AS avg_temp2\n" +
                "\t,TUMBLE_START(`window_time`, INTERVAL '3' MINUTE) AS `window_start`  \n" +
                " \t,TUMBLE_END(`window_time`, INTERVAL '3' MINUTE) AS `window_end`\n" +
                "FROM gtd_device_temp_data \n" +
                "GROUP BY TUMBLE(`window_time`, INTERVAL '3' MINUTE), `device_code`,`device_name` ")//                .print()
        ;
        table.executeInsert("ods_gtd_device_temp_data");
    }
}
